/*
  Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
  This file is part of GlusterFS.

  This file is licensed to you under your choice of the GNU Lesser
  General Public License, version 3 or any later version (LGPLv3 or
  later), or the GNU General Public License, version 2 (GPLv2), in all
  cases as published by the Free Software Foundation.
*/

#include "dht-common.h"

/* TODO: all 'TODO's in dht.c holds good */

extern struct volume_options dht_options[];

int
nufa_local_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                      int op_ret, int op_errno, inode_t *inode,
                      struct iatt *stbuf, dict_t *xattr,
                      struct iatt *postparent)
{
    xlator_t *subvol = NULL;
    char is_linkfile = 0;
    char is_dir = 0;
    dht_conf_t *conf = NULL;
    dht_local_t *local = NULL;
    loc_t *loc = NULL;
    int i = 0;
    xlator_t *prev = NULL;
    int call_cnt = 0;
    int ret = 0;

    conf = this->private;

    prev = cookie;
    local = frame->local;
    loc = &local->loc;

    if (ENTRY_MISSING(op_ret, op_errno)) {
        if (conf->search_unhashed) {
            local->op_errno = ENOENT;
            dht_lookup_everywhere(frame, this, loc);
            return 0;
        }
    }

    if (op_ret == -1)
        goto out;

    is_linkfile = check_is_linkfile(inode, stbuf, xattr, conf->link_xattr_name);
    is_dir = check_is_dir(inode, stbuf, xattr);

    if (!is_dir && !is_linkfile) {
        /* non-directory and not a linkfile */
        ret = dht_layout_preset(this, prev, inode);
        if (ret < 0) {
            gf_msg_debug(this->name, 0,
                         "could not set pre-set layout for subvol"
                         " %s",
                         prev->name);
            op_ret = -1;
            op_errno = EINVAL;
            goto err;
        }

        goto out;
    }

    if (is_dir) {
        call_cnt = conf->subvolume_cnt;
        local->call_cnt = call_cnt;

        local->inode = inode_ref(inode);
        local->xattr = dict_ref(xattr);

        local->op_ret = 0;
        local->op_errno = 0;

        local->layout = dht_layout_new(this, conf->subvolume_cnt);
        if (!local->layout) {
            op_ret = -1;
            op_errno = ENOMEM;
            goto err;
        }

        for (i = 0; i < call_cnt; i++) {
            STACK_WIND_COOKIE(frame, dht_lookup_dir_cbk, conf->subvolumes[i],
                              conf->subvolumes[i],
                              conf->subvolumes[i]->fops->lookup, &local->loc,
                              local->xattr_req);
        }
    }

    if (is_linkfile) {
        subvol = dht_linkfile_subvol(this, inode, stbuf, xattr);

        if (!subvol) {
            gf_msg_debug(this->name, 0,
                         "linkfile has no link subvolume. path=%s", loc->path);
            dht_lookup_everywhere(frame, this, loc);
            return 0;
        }

        STACK_WIND_COOKIE(frame, dht_lookup_linkfile_cbk, subvol, subvol,
                          subvol->fops->lookup, &local->loc, local->xattr_req);
    }

    return 0;

out:
    if (!local->hashed_subvol) {
        gf_msg_debug(this->name, 0, "no subvolume in layout for path=%s",
                     local->loc.path);
        local->op_errno = ENOENT;
        dht_lookup_everywhere(frame, this, loc);
        return 0;
    }

    STACK_WIND_COOKIE(frame, dht_lookup_cbk, local->hashed_subvol,
                      local->hashed_subvol, local->hashed_subvol->fops->lookup,
                      &local->loc, local->xattr_req);

    return 0;

err:
    DHT_STACK_UNWIND(lookup, frame, op_ret, op_errno, inode, stbuf, xattr,
                     postparent);
    return 0;
}

int
nufa_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xattr_req)
{
    xlator_t *hashed_subvol = NULL;
    xlator_t *subvol = NULL;
    dht_local_t *local = NULL;
    dht_conf_t *conf = NULL;
    int ret = -1;
    int op_errno = -1;
    dht_layout_t *layout = NULL;
    int i = 0;
    int call_cnt = 0;

    VALIDATE_OR_GOTO(frame, err);
    VALIDATE_OR_GOTO(this, err);
    VALIDATE_OR_GOTO(loc, err);
    VALIDATE_OR_GOTO(loc->inode, err);
    VALIDATE_OR_GOTO(loc->path, err);

    conf = this->private;

    local = dht_local_init(frame, loc, NULL, GF_FOP_LOOKUP);
    if (!local) {
        op_errno = ENOMEM;
        goto err;
    }

    if (xattr_req) {
        local->xattr_req = dict_ref(xattr_req);
    } else {
        local->xattr_req = dict_new();
    }

    hashed_subvol = dht_subvol_get_hashed(this, &local->loc);

    local->hashed_subvol = hashed_subvol;

    if (is_revalidate(loc)) {
        layout = local->layout;
        if (!layout) {
            gf_msg_debug(this->name, 0,
                         "revalidate lookup without cache. "
                         "path=%s",
                         loc->path);
            op_errno = EINVAL;
            goto err;
        }

        if (layout->gen && (layout->gen < conf->gen)) {
            gf_msg_debug(this->name, 0, "incomplete layout failure for path=%s",
                         loc->path);
            dht_layout_unref(this, local->layout);
            goto do_fresh_lookup;
        }

        local->inode = inode_ref(loc->inode);

        local->call_cnt = layout->cnt;
        call_cnt = local->call_cnt;

        /* NOTE: we don't require 'trusted.glusterfs.dht.linkto' attribute,
         *       revalidates directly go to the cached-subvolume.
         */
        ret = dict_set_uint32(local->xattr_req, conf->xattr_name, 4 * 4);
        if (ret < 0) {
            gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED,
                   "Failed to set dict value.");
            op_errno = -1;
            goto err;
        }

        for (i = 0; i < layout->cnt; i++) {
            subvol = layout->list[i].xlator;

            STACK_WIND_COOKIE(frame, dht_revalidate_cbk, subvol, subvol,
                              subvol->fops->lookup, loc, local->xattr_req);

            if (!--call_cnt)
                break;
        }
    } else {
    do_fresh_lookup:
        ret = dict_set_uint32(local->xattr_req, conf->xattr_name, 4 * 4);
        if (ret < 0) {
            gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED,
                   "Failed to set dict value.");
            op_errno = -1;
            goto err;
        }

        ret = dict_set_uint32(local->xattr_req, conf->link_xattr_name, 256);
        if (ret < 0) {
            gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED,
                   "Failed to set dict value.");
            op_errno = -1;
            goto err;
        }

        /* Send it to only local volume */
        STACK_WIND_COOKIE(
            frame, nufa_local_lookup_cbk, ((xlator_t *)conf->private),
            ((xlator_t *)conf->private),
            ((xlator_t *)conf->private)->fops->lookup, loc, local->xattr_req);
    }

    return 0;

err:
    op_errno = (op_errno == -1) ? errno : op_errno;
    DHT_STACK_UNWIND(lookup, frame, -1, op_errno, NULL, NULL, NULL, NULL);
    return 0;
}

int
nufa_create_linkfile_create_cbk(call_frame_t *frame, void *cookie,
                                xlator_t *this, int op_ret, int op_errno,
                                inode_t *inode, struct iatt *stbuf,
                                struct iatt *preparent, struct iatt *postparent,
                                dict_t *xdata)
{
    dht_local_t *local = NULL;

    local = frame->local;

    if (op_ret == -1)
        goto err;

    STACK_WIND_COOKIE(frame, dht_create_cbk, local->cached_subvol,
                      local->cached_subvol, local->cached_subvol->fops->create,
                      &local->loc, local->flags, local->mode, local->umask,
                      local->fd, local->params);

    return 0;

err:
    DHT_STACK_UNWIND(create, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL,
                     NULL);
    return 0;
}

int
nufa_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
            mode_t mode, mode_t umask, fd_t *fd, dict_t *params)
{
    dht_local_t *local = NULL;
    dht_conf_t *conf = NULL;
    xlator_t *subvol = NULL;
    xlator_t *avail_subvol = NULL;
    int op_errno = -1;

    VALIDATE_OR_GOTO(frame, err);
    VALIDATE_OR_GOTO(this, err);
    VALIDATE_OR_GOTO(loc, err);

    conf = this->private;

    dht_get_du_info(frame, this, loc);

    local = dht_local_init(frame, loc, fd, GF_FOP_CREATE);
    if (!local) {
        op_errno = ENOMEM;
        goto err;
    }

    subvol = dht_subvol_get_hashed(this, loc);
    if (!subvol) {
        gf_msg_debug(this->name, 0, "no subvolume in layout for path=%s",
                     loc->path);
        op_errno = ENOENT;
        goto err;
    }

    avail_subvol = conf->private;
    if (dht_is_subvol_filled(this, (xlator_t *)conf->private)) {
        avail_subvol = dht_free_disk_available_subvol(
            this, (xlator_t *)conf->private, local);
    }

    if (subvol != avail_subvol) {
        /* create a link file instead of actual file */
        local->params = dict_ref(params);
        local->mode = mode;
        local->flags = flags;
        local->umask = umask;
        local->cached_subvol = avail_subvol;
        dht_linkfile_create(frame, nufa_create_linkfile_create_cbk, this,
                            avail_subvol, subvol, loc);
        return 0;
    }

    gf_msg_trace(this->name, 0, "creating %s on %s", loc->path, subvol->name);

    STACK_WIND_COOKIE(frame, dht_create_cbk, subvol, subvol,
                      subvol->fops->create, loc, flags, mode, umask, fd,
                      params);

    return 0;

err:
    op_errno = (op_errno == -1) ? errno : op_errno;
    DHT_STACK_UNWIND(create, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL,
                     NULL);

    return 0;
}

int
nufa_mknod_linkfile_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
                        int op_ret, int op_errno, inode_t *inode,
                        struct iatt *stbuf, struct iatt *preparent,
                        struct iatt *postparent, dict_t *xdata)
{
    dht_local_t *local = NULL;

    local = frame->local;
    if (!local || !local->cached_subvol) {
        op_errno = EINVAL;
        op_ret = -1;
        goto err;
    }

    if (op_ret >= 0) {
        STACK_WIND_COOKIE(
            frame, dht_newfile_cbk, (void *)local->cached_subvol,
            local->cached_subvol, local->cached_subvol->fops->mknod,
            &local->loc, local->mode, local->rdev, local->umask, local->params);

        return 0;
    }
err:
    WIPE(postparent);
    WIPE(preparent);

    DHT_STACK_UNWIND(link, frame, op_ret, op_errno, inode, stbuf, preparent,
                     postparent, xdata);
    return 0;
}

int
nufa_mknod(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
           dev_t rdev, mode_t umask, dict_t *params)
{
    dht_local_t *local = NULL;
    dht_conf_t *conf = NULL;
    xlator_t *subvol = NULL;
    xlator_t *avail_subvol = NULL;
    int op_errno = -1;

    VALIDATE_OR_GOTO(frame, err);
    VALIDATE_OR_GOTO(this, err);
    VALIDATE_OR_GOTO(loc, err);

    conf = this->private;

    dht_get_du_info(frame, this, loc);

    local = dht_local_init(frame, loc, NULL, GF_FOP_MKNOD);
    if (!local) {
        op_errno = ENOMEM;
        goto err;
    }

    subvol = dht_subvol_get_hashed(this, loc);
    if (!subvol) {
        gf_msg_debug(this->name, 0, "no subvolume in layout for path=%s",
                     loc->path);
        op_errno = ENOENT;
        goto err;
    }

    /* Consider the disksize in consideration */
    avail_subvol = conf->private;
    if (dht_is_subvol_filled(this, (xlator_t *)conf->private)) {
        avail_subvol = dht_free_disk_available_subvol(
            this, (xlator_t *)conf->private, local);
    }

    if (avail_subvol != subvol) {
        /* Create linkfile first */

        local->params = dict_ref(params);
        local->mode = mode;
        local->umask = umask;
        local->rdev = rdev;
        local->cached_subvol = avail_subvol;

        dht_linkfile_create(frame, nufa_mknod_linkfile_cbk, this, avail_subvol,
                            subvol, loc);
        return 0;
    }

    gf_msg_trace(this->name, 0, "creating %s on %s", loc->path, subvol->name);

    STACK_WIND_COOKIE(frame, dht_newfile_cbk, (void *)subvol, subvol,
                      subvol->fops->mknod, loc, mode, rdev, umask, params);

    return 0;

err:
    op_errno = (op_errno == -1) ? errno : op_errno;
    DHT_STACK_UNWIND(mknod, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL);

    return 0;
}

gf_boolean_t
same_first_part(char *str1, char term1, char *str2, char term2)
{
    gf_boolean_t ended1;
    gf_boolean_t ended2;

    for (;;) {
        ended1 = ((*str1 == '\0') || (*str1 == term1));
        ended2 = ((*str2 == '\0') || (*str2 == term2));
        if (ended1 && ended2) {
            return _gf_true;
        }
        if (ended1 || ended2 || (*str1 != *str2)) {
            return _gf_false;
        }
        ++str1;
        ++str2;
    }
}

typedef struct nufa_args {
    xlator_t *this;
    char *volname;
    gf_boolean_t addr_match;
} nufa_args_t;

static void
nufa_find_local_brick(xlator_t *xl, void *data)
{
    nufa_args_t *args = data;
    xlator_t *this = args->this;
    char *local_volname = args->volname;
    gf_boolean_t addr_match = args->addr_match;
    char *brick_host = NULL;
    dht_conf_t *conf = this->private;
    int ret = -1;

    /*This means a local subvol was already found. We pick the first brick
     * that is local*/
    if (conf->private)
        return;

    if (strcmp(xl->name, local_volname) == 0) {
        conf->private = xl;
        gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_SUBVOL_INFO,
               "Using specified subvol %s", local_volname);
        return;
    }

    if (!addr_match)
        return;

    ret = dict_get_str(xl->options, "remote-host", &brick_host);
    if ((ret == 0) && (gf_is_same_address(local_volname, brick_host) ||
                       gf_is_local_addr(brick_host))) {
        conf->private = xl;
        gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_SUBVOL_INFO,
               "Using the first local "
               "subvol %s",
               xl->name);
        return;
    }
}

static void
nufa_to_dht(xlator_t *this)
{
    GF_ASSERT(this);
    GF_ASSERT(this->fops);

    this->fops->lookup = dht_lookup;
    this->fops->create = dht_create;
    this->fops->mknod = dht_mknod;
}

int
nufa_find_local_subvol(xlator_t *this, void (*fn)(xlator_t *each, void *data),
                       void *data)
{
    int ret = -1;
    dht_conf_t *conf = this->private;
    xlator_list_t *trav = NULL;
    xlator_t *parent = NULL;
    xlator_t *candidate = NULL;

    xlator_foreach_depth_first(this, fn, data);
    if (!conf->private) {
        gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_BRICK_ERROR,
               "Couldn't find a local "
               "brick");
        return -1;
    }

    candidate = conf->private;
    trav = candidate->parents;
    while (trav) {
        parent = trav->xlator;
        if (strcmp(parent->type, "cluster/nufa") == 0) {
            gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_SUBVOL_INFO,
                   "Found local subvol, "
                   "%s",
                   candidate->name);
            ret = 0;
            conf->private = candidate;
            break;
        }

        candidate = parent;
        trav = parent->parents;
    }

    return ret;
}

int
nufa_init(xlator_t *this)
{
    data_t *data = NULL;
    char *local_volname = NULL;
    int ret = -1;
    char my_hostname[256];
    gf_boolean_t addr_match = _gf_false;
    nufa_args_t args = {
        0,
    };

    ret = dht_init(this);
    if (ret) {
        return ret;
    }

    if ((data = dict_get(this->options, "local-volume-name"))) {
        local_volname = data->data;

    } else {
        addr_match = _gf_true;
        local_volname = "localhost";
        ret = gethostname(my_hostname, 256);
        if (ret == 0)
            local_volname = my_hostname;

        else
            gf_msg(this->name, GF_LOG_WARNING, errno,
                   DHT_MSG_GET_HOSTNAME_FAILED, "could not find hostname");
    }

    args.this = this;
    args.volname = local_volname;
    args.addr_match = addr_match;
    ret = nufa_find_local_subvol(this, nufa_find_local_brick, &args);
    if (ret) {
        gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_SUBVOL_INFO,
               "Unable to find local subvolume, switching "
               "to dht mode");
        nufa_to_dht(this);
    }
    return 0;
}

dht_methods_t dht_methods = {
    .migration_get_dst_subvol = dht_migration_get_dst_subvol,
    .migration_needed = dht_migration_needed,
    .layout_search = dht_layout_search,
};

struct xlator_fops fops = {
    .lookup = nufa_lookup,
    .create = nufa_create,
    .mknod = nufa_mknod,

    .stat = dht_stat,
    .fstat = dht_fstat,
    .truncate = dht_truncate,
    .ftruncate = dht_ftruncate,
    .access = dht_access,
    .readlink = dht_readlink,
    .setxattr = dht_setxattr,
    .getxattr = dht_getxattr,
    .removexattr = dht_removexattr,
    .open = dht_open,
    .readv = dht_readv,
    .writev = dht_writev,
    .flush = dht_flush,
    .fsync = dht_fsync,
    .statfs = dht_statfs,
    .lk = dht_lk,
    .opendir = dht_opendir,
    .readdir = dht_readdir,
    .readdirp = dht_readdirp,
    .fsyncdir = dht_fsyncdir,
    .symlink = dht_symlink,
    .unlink = dht_unlink,
    .link = dht_link,
    .mkdir = dht_mkdir,
    .rmdir = dht_rmdir,
    .rename = dht_rename,
    .inodelk = dht_inodelk,
    .finodelk = dht_finodelk,
    .entrylk = dht_entrylk,
    .fentrylk = dht_fentrylk,
    .xattrop = dht_xattrop,
    .fxattrop = dht_fxattrop,
    .setattr = dht_setattr,
};

struct xlator_cbks cbks = {.forget = dht_forget};
extern int32_t
mem_acct_init(xlator_t *this);

xlator_api_t xlator_api = {
    .init = nufa_init,
    .fini = dht_fini,
    .notify = dht_notify,
    .reconfigure = dht_reconfigure,
    .mem_acct_init = mem_acct_init,
    .op_version = {1}, /* Present from the initial version */
    .fops = &fops,
    .cbks = &cbks,
    .options = dht_options,
    .identifier = "nufa",
    .category = GF_TECH_PREVIEW,
};
